In [ ]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

2.1. Google Cloud Storage (CSV) & Spark DataFrames - Python

Create Dataproc Cluster with Jupyter

This notebook is designed to be run on Google Cloud Dataproc.

Follow the links below for instructions on how to create a Dataproc Cluster with the Juypter component installed.

Python 3 Kernel

Use a Python 3 kernel (not PySpark) to allow you to configure the SparkSession in the notebook.

Create Spark Session


In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
  .appName('2.1. Google Cloud Storage (CSV) & Spark DataFrames') \
  .getOrCreate()

Enable repl.eagerEval

This will output the results of DataFrames in each step without the need to use df.show() and also improves the formatting of the output


In [2]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

List files in a GCS bucket

List files in a Google Cloud Storage bucket using the google-cloud-storage python library which comes installed on Dataproc clusters. We will be using a publicly available dataset.


In [3]:
from google.cloud import storage

gcs_client = storage.Client()
bucket = gcs_client.bucket('solutions-public-assets')

list(bucket.list_blobs(prefix='time-series-master/'))


Out[3]:
[<Blob: solutions-public-assets, time-series-master/, 1423455996970000>,
 <Blob: solutions-public-assets, time-series-master/GBPUSD_2014_01.csv, 1423456343320000>,
 <Blob: solutions-public-assets, time-series-master/GBPUSD_2014_02.csv, 1423456332787000>]

Alternatively use the hdfs cmd to list files in a directory which supports GCS buckets


In [4]:
!hdfs dfs -ls 'gs://solutions-public-assets/time-series-master'


Found 2 items
-rwx------   3 root root   67868938 2015-02-09 04:32 gs://solutions-public-assets/time-series-master/GBPUSD_2014_01.csv
-rwx------   3 root root   61275261 2015-02-09 04:32 gs://solutions-public-assets/time-series-master/GBPUSD_2014_02.csv

Read CSV files from GCS into Spark Dataframe

Read CSV files from GCS into a dataframe and infer the schema


In [5]:
df1 = spark \
  .read \
  .option ( "inferSchema" , "true" ) \
  .option ( "header" , "true" ) \
  .csv ( "gs://solutions-public-assets/time-series-master/GBPUSD_*.csv" )

df1.printSchema()


root
 |-- XYZ: string (nullable = true)
 |-- GBP/USD: string (nullable = true)
 |-- 2014-01-01 00:00:00.000000: timestamp (nullable = true)
 |-- 1.4995: double (nullable = true)
 |-- 1.5005: double (nullable = true)


In [17]:
df1


Out[17]:
XYZGBP/USD2014-01-01 00:00:00.0000001.49951.5005
XYZGBP/USD2014-01-01 00:00:...1.49881.4998
XYZGBP/USD2014-01-01 00:00:...1.49791.4989
XYZGBP/USD2014-01-01 00:00:...1.49931.5003
XYZGBP/USD2014-01-01 00:00:...1.49891.4999
XYZGBP/USD2014-01-01 00:00:...1.49981.5008
XYZGBP/USD2014-01-01 00:00:...1.50011.5011
XYZGBP/USD2014-01-01 00:00:...1.49911.5001
XYZGBP/USD2014-01-01 00:00:...1.49781.4988
XYZGBP/USD2014-01-01 00:00:...1.49741.4984
XYZGBP/USD2014-01-01 00:00:...1.49871.4997
XYZGBP/USD2014-01-01 00:00:...1.49791.4989
XYZGBP/USD2014-01-01 00:00:...1.49791.4989
XYZGBP/USD2014-01-01 00:01:...1.49911.5001
XYZGBP/USD2014-01-01 00:01:...1.49971.5007
XYZGBP/USD2014-01-01 00:01:...1.49961.5006
XYZGBP/USD2014-01-01 00:01:...1.49981.5008
XYZGBP/USD2014-01-01 00:01:...1.49951.5005
XYZGBP/USD2014-01-01 00:01:...1.49921.5002
XYZGBP/USD2014-01-01 00:01:...1.49791.4989
XYZGBP/USD2014-01-01 00:01:...1.4981.499
only showing top 20 rows

If there is no header with column names as we can see with the dataset here or the schema is not infered correctly then read CSV files from GCS and define schema


In [6]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, DateType

schema = StructType([
    StructField("venue", StringType()),
    StructField("currencies", StringType()),
    StructField("time_stamp", TimestampType()),
    StructField("bid", DoubleType()),
    StructField("ask", DoubleType())
])

df2 = spark \
  .read \
  .schema(schema) \
  .csv ( "gs://solutions-public-assets/time-series-master/GBPUSD_*.csv" )

df2.printSchema()


root
 |-- venue: string (nullable = true)
 |-- currencies: string (nullable = true)
 |-- time_stamp: timestamp (nullable = true)
 |-- bid: double (nullable = true)
 |-- ask: double (nullable = true)

View the top 20 rows of the spark dataframe


In [7]:
df2


Out[7]:
venuecurrenciestime_stampbidask
XYZGBP/USD2014-01-01 00:00:001.49951.5005
XYZGBP/USD2014-01-01 00:00:...1.49881.4998
XYZGBP/USD2014-01-01 00:00:...1.49791.4989
XYZGBP/USD2014-01-01 00:00:...1.49931.5003
XYZGBP/USD2014-01-01 00:00:...1.49891.4999
XYZGBP/USD2014-01-01 00:00:...1.49981.5008
XYZGBP/USD2014-01-01 00:00:...1.50011.5011
XYZGBP/USD2014-01-01 00:00:...1.49911.5001
XYZGBP/USD2014-01-01 00:00:...1.49781.4988
XYZGBP/USD2014-01-01 00:00:...1.49741.4984
XYZGBP/USD2014-01-01 00:00:...1.49871.4997
XYZGBP/USD2014-01-01 00:00:...1.49791.4989
XYZGBP/USD2014-01-01 00:00:...1.49791.4989
XYZGBP/USD2014-01-01 00:01:...1.49911.5001
XYZGBP/USD2014-01-01 00:01:...1.49971.5007
XYZGBP/USD2014-01-01 00:01:...1.49961.5006
XYZGBP/USD2014-01-01 00:01:...1.49981.5008
XYZGBP/USD2014-01-01 00:01:...1.49951.5005
XYZGBP/USD2014-01-01 00:01:...1.49921.5002
XYZGBP/USD2014-01-01 00:01:...1.49791.4989
only showing top 20 rows

Print the shape of the dataframe. No of rows and no of columns


In [8]:
print((df2.count(), len(df2.columns)))


(2436683, 5)

Add hour column and filter the data to create a new dataframe with only 1 day of data


In [9]:
import pyspark.sql.functions as F

df3 = df2.withColumn("hour", F.hour(F.col("time_stamp"))) \
  .filter(df2['time_stamp'] >= F.lit('2014-01-01 00:00:00')) \
  .filter(df2['time_stamp'] < F.lit('2014-01-02 00:00:10')).cache()

df3


Out[9]:
venuecurrenciestime_stampbidaskhour
XYZGBP/USD2014-01-01 00:00:001.49951.50050
XYZGBP/USD2014-01-01 00:00:...1.49881.49980
XYZGBP/USD2014-01-01 00:00:...1.49791.49890
XYZGBP/USD2014-01-01 00:00:...1.49931.50030
XYZGBP/USD2014-01-01 00:00:...1.49891.49990
XYZGBP/USD2014-01-01 00:00:...1.49981.50080
XYZGBP/USD2014-01-01 00:00:...1.50011.50110
XYZGBP/USD2014-01-01 00:00:...1.49911.50010
XYZGBP/USD2014-01-01 00:00:...1.49781.49880
XYZGBP/USD2014-01-01 00:00:...1.49741.49840
XYZGBP/USD2014-01-01 00:00:...1.49871.49970
XYZGBP/USD2014-01-01 00:00:...1.49791.49890
XYZGBP/USD2014-01-01 00:00:...1.49791.49890
XYZGBP/USD2014-01-01 00:01:...1.49911.50010
XYZGBP/USD2014-01-01 00:01:...1.49971.50070
XYZGBP/USD2014-01-01 00:01:...1.49961.50060
XYZGBP/USD2014-01-01 00:01:...1.49981.50080
XYZGBP/USD2014-01-01 00:01:...1.49951.50050
XYZGBP/USD2014-01-01 00:01:...1.49921.50020
XYZGBP/USD2014-01-01 00:01:...1.49791.49890
only showing top 20 rows

In [10]:
print((df3.count(), len(df3.columns)))


(41390, 6)

Group by hour and order by top_bids


In [11]:
import pyspark.sql.functions as F

df4 = df3 \
.groupBy("hour") \
.agg(F.sum('bid').alias('total_bids'))

df4.orderBy('total_bids', ascending=False)


Out[11]:
hourtotal_bids
124888.966399999975
134852.239699999989
144569.660199999988
154518.744800000002
82489.1048999999966
102431.1414000000077
92424.1796000000018
182368.89479999999
192355.5363999999986
112347.3602999999907
172300.944100000002
162295.8520999999996
41749.3830000000016
51726.0291000000027
61662.2480999999957
71648.9573000000023
201572.7295999999976
211558.4258999999997
231546.2942000000016
221512.3183999999997
only showing top 20 rows

Write Spark Dataframe to Google Cloud Storage in CSV format

Write the Spark Dataframe to Google Cloud Storage using

If the GCS bucket does not exist it will need to be created before running df.write


In [12]:
# Update to your GCS bucket
gcs_bucket = 'dataproc-bucket-name'

gcs_filepath = 'gs://{}/currency/hourly_bids.csv'.format(gcs_bucket)

df4.coalesce(1).write \
  .mode('overwrite') \
  .csv(gcs_filepath)

Read the CSV file into new DataFrame to check it was successfuly saved


In [13]:
!hdfs dfs -ls gs://dataproc-bucket-name/currency


Found 1 items
drwx------   - root root          0 2020-03-27 17:08 gs://dataproc-bucket-name/currency/hourly_bids.csv

In [14]:
df5 = spark.read \
  .option ( "inferSchema" , "true" ) \
  .option ( "header" , "true" ) \
  .csv('gs://dataproc-bucket-name/currency/*')

df5


Out[14]:
124888.966399999975
221512.3183999999997
11040.8376999999998
134852.239699999989
61662.2480999999957
162295.8520999999996
31032.2795000000003
201572.7295999999976
51726.0291000000027
192355.5363999999986
154518.744800000002
92424.1796000000018
172300.944100000002
41749.3830000000016
82489.1048999999966
231546.2942000000016
71648.9573000000023
102431.1414000000077
211558.4258999999997
112347.3602999999907
144569.660199999988
only showing top 20 rows